JAVA Socket编程学习10 您所在的位置:网站首页 socket 粘包 JAVA Socket编程学习10

JAVA Socket编程学习10

2023-08-25 19:08| 来源: 网络整理| 查看: 265

前言:         根据我的第七篇文章http://blog.csdn.net/m0_37739193/article/details/78484577编写了NIO的Socket服务端代码后,接受UDP的数据正常,但是接收的TCP数据却出现了粘包分包/拆包/半包问题,查阅网上资料知道已经有开源的Netty提供了多种支持TCP粘包/拆包的解码器,用来满足用户的不同诉求。

        可是我已经根据Java原生的NIO写完了整个代码框架,要是换成Netty的话就整个框架代码就都得换了,由于嫌麻烦,我就开动自己的大脑,能不能自己通过代码来实现呢?在自己的努力之下好像是可以的哈,基本能把收来的数据处理成自己想要的。

首先模拟一个TCP客户端:

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class HeavyTCPClient1 { private static ExecutorService tp = Executors.newCachedThreadPool(); private static final int sleep_time = 1000*1000*1000; public static class EchoClient implements Runnable{ public void run(){ Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 7788)); writer = new PrintWriter(client.getOutputStream(), true); // writer.print("{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':"); writer.print("0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'db"); // writer.print("opt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"s"); // writer.print("erver_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n"); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); reader.close(); writer.close(); client.close(); } catch (UnknownHostException e){ e.printStackTrace(); } catch (IOException e){ e.printStackTrace(); } finally { try { if (writer != null) writer.close(); if (reader != null) reader.close(); if (client != null) client.close(); } catch (IOException e){ } } } } public static void main(String args[]) throws IOException { EchoClient ec = new EchoClient(); // for(int i=0;i 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new TimeServer().bind(port); } }

package NettyTest; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }

客户端代码:

package NettyTest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //这两行高亮显示 ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new TimeClient().connect(port, "127.0.0.1"); } }

package NettyTest; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; //这行高亮显示 System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 释放资源 ctx.close(); } }两个变化: 1.拿到的msg已经是解码成字符串之后的应答消息 2.新增了两个解码器:第一个是LineBasedFrameDecoder,第二个是StringDecoder。 运行结果: 服务端执行结果如下 The time server receive order : QUERY TIME ORDER ; the counter is : 1 。。。。。。 The time server receive order : QUERY TIME ORDER ; the counter is : 100 客户端运行结果如下 Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 1 。。。。。。 Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 100

注意2:原文章中服务端和客户端的Handler都继承了ChannelHandlerAdapter类,但是当我将代码导入到Myeclipse中时(运行环境为jdk1.7和netty-4.0.53)却报错根本无法运行程序 解决:将ChannelHandlerAdapter改为ChannelInboundHandlerAdapter,后来才知道ChannelHandlerAdapter是ChannelInboundHandlerAdapter的父类(http://blog.csdn.net/linuu/article/details/51315373) 注意3:在代码中有这么行代码System.getProperty("line.separator"),一开始我居然不知道是什么意思,哎,还是我孤陋寡闻了啊。然后上网查了下,这是搜索的结果 在java中存在一些转义字符,比如"\n"为换行符,但是也有一些JDK自带的一些操作符 比如 : System.getProperty("line.separator") 这也是换行符,功能和"\n"是一致的,但是此种写法屏蔽了 Windows和Linux的区别 ,更保险一些.

        程序的运行结果完全符合预期,说明通过使用LineBasedFrameDecoder和StringDecoder成功解决了TCP粘包导致的读半包问题。对于使用者来说,只要将支持半包解码的handler添加到ChannelPipeline中即可,不需要写额外的代码,用户使用起来非常简单。

LineBasedFrameDecoder和StringDecoder的原理分析         LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。         StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handler。LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。         如果发送的消息不是以换行符结束的该怎么办呢?或者没有回车换行符,靠消息头中的长度字段来分包怎么办?是不是需要自己写半包解码器?答案是否定的,Netty提供了多种支持TCP粘包/拆包的解码器,用来满足用户的不同诉求。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有